package com.example;

import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class SubscriptionStudy {
    public static void main(String[] args) throws PulsarClientException, InterruptedException {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("http://192.168.0.106:8080")
                .build();

        AtomicLong timestamp = new AtomicLong(0);
        Consumer<String> consumer = client.newConsumer(Schema.STRING)
                .subscriptionType(SubscriptionType.Shared)
                .subscriptionMode(SubscriptionMode.Durable)
                .subscriptionName("topic1-subscription1")
                .topic("topic1", "topic2")
                .consumerName("张三")
                //.negativeAckRedeliveryDelay(2, TimeUnit.MINUTES)
                //.ackTimeout(10, TimeUnit.SECONDS)
                //.ackTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
                //        .minDelayMs(1000)
                //        .maxDelayMs(60*1000)
                //        .multiplier(2)
                //        .build())
                .enableRetry(true)
                .deadLetterPolicy(DeadLetterPolicy.builder()
                        .maxRedeliverCount(5)
                                .initialSubscriptionName("init-subs-of-dlq")
                        //.retryLetterTopic("my-retry-letter-topic-name")
                        .build())
                .messageListener((MessageListener<String>) (consumer1, msg) -> {
                    long old = timestamp.getAndSet(System.currentTimeMillis());
                    if(old != 0){
                        // 意味着现在是重新投递
                        long durationInMills = timestamp.get() - old;
                        long seconds = Duration.ofMillis(durationInMills).toSeconds();
                        System.out.println("收到重新投递的消息，耗时" + seconds + "秒");
                    }else {
                        System.out.println("首次收到投递的消息");
                    }

                    //ack(consumer1, msg);

                    try {
                        consumer1.reconsumeLater(msg, 5, TimeUnit.SECONDS);
                    } catch (PulsarClientException e) {
                        throw new RuntimeException(e);
                    }

                    //consumer1.negativeAcknowledge(msg);



                    //throw new RuntimeException("ex");

                    //System.out.println();
                    //System.out.println("topic1-subscription1 消费者" + consumer1 + "收到消息：" + msg.getValue());
                    //try {
                    //    consumer1.acknowledge(msg.getMessageId());
                    //} catch (PulsarClientException e) {
                    //    throw new RuntimeException(e);
                    //}
                })
                .subscribe();

        /*
        Consumer<String> consumer2 = client.newConsumer(Schema.STRING)
                .subscriptionType(SubscriptionType.Shared)
                .subscriptionMode(SubscriptionMode.Durable)
                .subscriptionName("topic1-subscription1")
                .topic("topic1", "topic2")
                .consumerName("李四")
                .messageListener((MessageListener<String>) (consumer1, msg) -> {
                    System.out.println("topic1-subscription1 消费者" + consumer1 + "收到消息：" + msg.getValue());
                    try {
                        consumer1.acknowledge(msg.getMessageId());
                    } catch (PulsarClientException e) {
                        throw new RuntimeException(e);
                    }
                })
                .subscribe();

        Consumer<String> consumer3 = client.newConsumer(Schema.STRING)
                .subscriptionType(SubscriptionType.Exclusive)
                .subscriptionMode(SubscriptionMode.Durable)
                .subscriptionName("topic1-subscription2")
                .topic("topic1", "topic2")
                .consumerName("王五")
                .messageListener((MessageListener<String>) (consumer1, msg) -> {
                    System.out.println("topic1-subscription2 消费者" + consumer1 + "收到消息：" + msg.getValue());
                    try {
                        consumer1.acknowledge(msg.getMessageId());
                    } catch (PulsarClientException e) {
                        throw new RuntimeException(e);
                    }
                })
                .subscribe();*/

        Consumer<String> consumer4 = client.newConsumer(Schema.STRING)
                .subscriptionType(SubscriptionType.Shared)
                .subscriptionMode(SubscriptionMode.Durable)
                .subscriptionName("init-subs-of-dlq")
                .topic("topic1-topic1-subscription1-DLQ")
                .consumerName("张三（DLQ）")
                .messageListener(new MessageListener<String>() {
                    @Override
                    public void received(Consumer<String> consumer, Message<String> msg) {
                        System.out.println("收到死信消息：" + msg.getValue());
                        try {
                            consumer.acknowledge(msg);
                        } catch (PulsarClientException e) {
                            throw new RuntimeException(e);
                        }
                    }
                })
                .subscribe();

        Producer<String> producer0 = client.newProducer(Schema.STRING)
                .producerName("topic1-producer0")
                .topic("topic1")
                .accessMode(ProducerAccessMode.Shared)
                .create();

        Producer<String> producer = client.newProducer(Schema.STRING)
                .producerName("topic1-producer1")
                .topic("topic1")
                .accessMode(ProducerAccessMode.Exclusive)
                .create();
        producer.send("发送到topic1的第一条消息");
        //producer.send("发送到topic1的第二条消息");
        //producer.send("发送到topic1的第三条消息");


        TimeUnit.MINUTES.sleep(25);
        consumer.close();
        //consumer2.close();
        //consumer3.close();
        consumer4.close();
        client.close();
    }

    private static void ack(Consumer<String> consumer1, Message<String> msg) {
        try {
                consumer1.acknowledge(msg.getMessageId());
        } catch (PulsarClientException e) {
                throw new RuntimeException(e);
        }
    }
}
